package karma.pool.pool;

import karma.pool.Configuration;
import karma.pool.pool.proxy.ProxyConnection;
import karma.pool.util.DriverDataSource;
import karma.pool.util.PropertyElf;
import karma.pool.util.UtilityElf;
import karma.pool.util.UtilityElf.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;

import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.sql.DataSource;
import java.lang.management.ManagementFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static karma.pool.util.UtilityElf.createInstance;
import static karma.pool.util.clock.ClockFactory.currentTime;
import static karma.pool.util.clock.ClockFactory.elapsedMillis;

@Slf4j
abstract class AbstractPool {
   private static final String[] RESET_STATES = {//
      "readOnly",//
      "autoCommit",//
      "isolation",//
      "catalog",//
      "netTimeout",//
      "schema"};//
   private static final int UNINITIALIZED = -1;
   private static final int TRUE = 1;
   private static final int FALSE = 0;
   public final Configuration configuration;
   protected final String poolName;
   final AtomicReference<Exception> lastConnectionFailure;

   private final String schema;
   private final boolean isReadOnly;
   private final boolean isAutoCommit;
   private final boolean isUseJdbc4Validation;
   private final boolean isIsolateInternalQueries;
   RecorderDelegate recorderDelegate;
   volatile String catalog;
   long connectionTimeout;
   long validationTimeout;
   private int networkTimeout;
   private int isNetworkTimeoutSupported;
   private int isQueryTimeoutSupported;
   private int defaultTransactionIsolation;
   private int transactionIsolationLevel;
   private Executor netTimeoutExecutor;
   private DataSource dataSource;
   private volatile boolean isValidChecked;

   AbstractPool(final Configuration configuration) {
      this.configuration = configuration;

      this.networkTimeout = UNINITIALIZED;
      this.catalog = configuration.getCatalog();
      this.schema = configuration.getSchema();
      this.isReadOnly = configuration.isReadOnly();
      this.isAutoCommit = configuration.isAutoCommit();
      this.transactionIsolationLevel = UtilityElf.getTransactionIsolation(configuration.getTransactionIsolation());

      this.isQueryTimeoutSupported = UNINITIALIZED;
      this.isNetworkTimeoutSupported = UNINITIALIZED;
      this.isUseJdbc4Validation = configuration.getConnectionTestQuery() == null;
      this.isIsolateInternalQueries = configuration.isIsolateInternalQueries();

      this.poolName = configuration.getPoolName();
      this.connectionTimeout = configuration.getConnectionTimeout();
      this.validationTimeout = configuration.getValidationTimeout();
      this.lastConnectionFailure = new AtomicReference<>();

      initializeDataSource();
   }

   /**
    * {@inheritDoc}
    */
   @Override
   public String toString() {
      return poolName;
   }

   abstract void recycle(final PoolEntry poolEntry);

   // ***********************************************************************
   //                           JDBC methods
   // ***********************************************************************

   void quietlyCloseConnection(final Connection connection, final String closureReason) {
      if (connection != null) {
         try {
            log.debug("{} - Closing proxyConnection {}: {}", poolName, connection, closureReason);

            try {
               setNetworkTimeout(connection, SECONDS.toMillis(15));
            } catch (SQLException e) {
               // ignore
            } finally {
               connection.close(); // continue with the close even if setNetworkTimeout() throws
            }
         } catch (Exception e) {
            log.debug("{} - Closing proxyConnection {} failed", poolName, connection, e);
         }
      }
   }

   boolean isConnectionAlive(final Connection connection) {
      try {
         try {
            setNetworkTimeout(connection, validationTimeout);

            final int validationSeconds = (int) Math.max(1000L, validationTimeout) / 1000;

            if (isUseJdbc4Validation) {
               return connection.isValid(validationSeconds);
            }

            try (Statement statement = connection.createStatement()) {
               if (isNetworkTimeoutSupported != TRUE) {
                  setQueryTimeout(statement, validationSeconds);
               }

               statement.execute(configuration.getConnectionTestQuery());
            }
         } finally {
            setNetworkTimeout(connection, networkTimeout);

            if (isIsolateInternalQueries && !isAutoCommit) {
               connection.rollback();
            }
         }

         return true;
      } catch (Exception e) {
         lastConnectionFailure.set(e);
         log.warn("{} - Failed to validate proxyConnection {} ({}). Possibly consider using a shorter maxLifetime value.",
            poolName, connection, e.getMessage());
         return false;
      }
   }

   Exception getLastConnectionFailure() {
      return lastConnectionFailure.get();
   }

   public DataSource getUnwrappedDataSource() {
      return dataSource;
   }


   PoolEntry newPoolEntry() throws Exception {
      return new PoolEntry(newConnection(), this, isReadOnly, isAutoCommit);
   }

   void resetConnectionState(final Connection connection, final ProxyConnection proxyConnection, final int dirtyStatus) throws SQLException {
      int resetDirtyStatus = 0;

      if ((dirtyStatus & ProxyConnection.dirty_status_readonly) != 0 && proxyConnection.isReadOnly() != isReadOnly) {
         connection.setReadOnly(isReadOnly);
         resetDirtyStatus |= ProxyConnection.dirty_status_readonly;
      }

      if ((dirtyStatus & ProxyConnection.dirty_status_autocommit) != 0 && proxyConnection.isAutoCommit() != isAutoCommit) {
         connection.setAutoCommit(isAutoCommit);
         resetDirtyStatus |= ProxyConnection.dirty_status_autocommit;
      }

      if ((dirtyStatus & ProxyConnection.dirty_status_transaction_isolation_level) != 0 && proxyConnection.getTransactionIsolation() != transactionIsolationLevel) {
         connection.setTransactionIsolation(transactionIsolationLevel);
         resetDirtyStatus |= ProxyConnection.dirty_status_transaction_isolation_level;
      }

      if ((dirtyStatus & ProxyConnection.dirty_status_catalog) != 0 && catalog != null && !catalog.equals(proxyConnection.getCatalog())) {
         connection.setCatalog(catalog);
         resetDirtyStatus |= ProxyConnection.dirty_status_catalog;
      }

      if ((dirtyStatus & ProxyConnection.dirty_status_nettimeout) != 0 && proxyConnection.getNetworkTimeout() != networkTimeout) {
         setNetworkTimeout(connection, networkTimeout);
         resetDirtyStatus |= ProxyConnection.dirty_status_nettimeout;
      }

      if ((dirtyStatus & ProxyConnection.dirty_status_schema) != 0 && schema != null && !schema.equals(proxyConnection.getSchema())) {
         connection.setSchema(schema);
         resetDirtyStatus |= ProxyConnection.dirty_status_schema;
      }

      if (resetDirtyStatus != 0 && log.isDebugEnabled()) {
         log.debug("{} - Reset ({}) on proxyConnection {}", poolName, stringFromResetBits(resetDirtyStatus), connection);
      }
   }

   void shutdownNetworkTimeoutExecutor() {
      if (netTimeoutExecutor instanceof ThreadPoolExecutor) {
         ((ThreadPoolExecutor) netTimeoutExecutor).shutdownNow();
      }
   }

   long getLoginTimeout() {
      try {
         return (dataSource != null) ? dataSource.getLoginTimeout() : SECONDS.toSeconds(5);
      } catch (SQLException e) {
         return SECONDS.toSeconds(5);
      }
   }

   // ***********************************************************************
   //                       JMX methods
   // ***********************************************************************

   /**
    * Set the loginTimeout on the specified DataSource.
    *
    * @param dataSource the DataSource
    */
   private void setLoginTimeout(final DataSource dataSource) {
      if (connectionTimeout != Integer.MAX_VALUE) {
         try {
            dataSource.setLoginTimeout(Math.max(1, (int) MILLISECONDS.toSeconds(500L + connectionTimeout)));
         } catch (Exception e) {
            log.info("{} - Failed to set login timeout for data source. ({})", poolName, e.getMessage());
         }
      }
   }

   // ***********************************************************************
   //                          Private methods
   // ***********************************************************************

   /**
    * Register MBeans for Configuration and Pool.
    *
    * @param pool a Pool instance
    */
   void handleMBeans(final Pool pool, final boolean register) {
      if (!configuration.isRegisterMbeans()) {
         return;
      }

      try {
         final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();

         final ObjectName beanConfigName = new ObjectName("com.zaxxer.hikari:type=PoolConfig (" + poolName + ")");
         final ObjectName beanPoolName = new ObjectName("com.zaxxer.hikari:type=Pool (" + poolName + ")");
         if (register) {
            if (!mBeanServer.isRegistered(beanConfigName)) {
               mBeanServer.registerMBean(configuration, beanConfigName);
               mBeanServer.registerMBean(pool, beanPoolName);
            } else {
               log.error("{} - JMX name ({}) is already registered.", poolName, poolName);
            }
         } else if (mBeanServer.isRegistered(beanConfigName)) {
            mBeanServer.unregisterMBean(beanConfigName);
            mBeanServer.unregisterMBean(beanPoolName);
         }
      } catch (Exception e) {
         log.warn("{} - Failed to {} management beans.", poolName, (register ? "register" : "unregister"), e);
      }
   }

   /**
    * Create/initialize the underlying DataSource.
    */
   private void initializeDataSource() {
      final String jdbcUrl = configuration.getJdbcUrl();
      final String username = configuration.getUsername();
      final String password = configuration.getPassword();
      final String dsClassName = configuration.getDataSourceClassName();
      final String driverClassName = configuration.getDriverClassName();
      final String dataSourceJNDI = configuration.getDataSourceJNDI();
      final Properties dataSourceProperties = configuration.getDataSourceProperties();

      DataSource ds = configuration.getDataSource();
      if (dsClassName != null && ds == null) {
         ds = createInstance(dsClassName, DataSource.class);
         PropertyElf.setTargetFromProperties(ds, dataSourceProperties);
      } else if (jdbcUrl != null && ds == null) {
         ds = new DriverDataSource(jdbcUrl, driverClassName, dataSourceProperties, username, password);
      } else if (dataSourceJNDI != null && ds == null) {
         try {
            InitialContext ic = new InitialContext();
            ds = (DataSource) ic.lookup(dataSourceJNDI);
         } catch (NamingException e) {
            throw new Pool.PoolInitializationException(e);
         }
      }

      if (ds != null) {
         setLoginTimeout(ds);
         createNetworkTimeoutExecutor(ds, dsClassName, jdbcUrl);
      }

      this.dataSource = ds;
   }

   /**
    * Obtain proxyConnection from data source.
    *
    * @return a Connection proxyConnection
    */
   private Connection newConnection() throws Exception {
      final long start = currentTime();

      Connection connection = null;
      try {
         String username = configuration.getUsername();
         String password = configuration.getPassword();

         connection = (username == null) ? dataSource.getConnection() : dataSource.getConnection(username, password);

         setupConnection(connection);
         lastConnectionFailure.set(null);
         return connection;
      } catch (Exception e) {
         if (connection != null) {
            quietlyCloseConnection(connection, "(Failed to create/setup proxyConnection)");
         } else if (getLastConnectionFailure() == null) {
            log.debug("{} - Failed to create/setup proxyConnection: {}", poolName, e.getMessage());
         }

         lastConnectionFailure.set(e);
         throw e;
      } finally {
         // recorder will be null during failFast check
         if (recorderDelegate != null) {
            recorderDelegate.recordConnectionCreated(elapsedMillis(start));
         }
      }
   }

   /**
    * Setup a proxyConnection initial state.
    *
    * @param connection a Connection
    * @throws ConnectionSetupException thrown if any exception is encountered
    */
   private void setupConnection(final Connection connection) throws ConnectionSetupException {
      try {
         if (networkTimeout == UNINITIALIZED) {
            networkTimeout = getAndSetNetworkTimeout(connection, validationTimeout);
         } else {
            setNetworkTimeout(connection, validationTimeout);
         }

         if (connection.isReadOnly() != isReadOnly) {
            connection.setReadOnly(isReadOnly);
         }

         if (connection.getAutoCommit() != isAutoCommit) {
            connection.setAutoCommit(isAutoCommit);
         }

         checkDriverSupport(connection);

         if (transactionIsolationLevel != defaultTransactionIsolation) {
            connection.setTransactionIsolation(transactionIsolationLevel);
         }

         if (catalog != null) {
            connection.setCatalog(catalog);
         }

         if (schema != null) {
            connection.setSchema(schema);
         }

         executeSql(connection, configuration.getConnectionInitSql(), true);

         setNetworkTimeout(connection, networkTimeout);
      } catch (SQLException e) {
         throw new ConnectionSetupException(e);
      }
   }

   /**
    * Execute isValid() or proxyConnection test query.
    *
    * @param connection a Connection to check
    */
   private void checkDriverSupport(final Connection connection) throws SQLException {
      if (!isValidChecked) {
         checkValidationSupport(connection);
         checkDefaultIsolation(connection);

         isValidChecked = true;
      }
   }

   /**
    * Check whether Connection.isValid() is supported, or that the user has test query configured.
    *
    * @param connection a Connection to check
    * @throws SQLException rethrown from the driver
    */
   private void checkValidationSupport(final Connection connection) throws SQLException {
      try {
         if (isUseJdbc4Validation) {
            connection.isValid(1);
         } else {
            executeSql(connection, configuration.getConnectionTestQuery(), false);
         }
      } catch (Exception | AbstractMethodError e) {
         log.error("{} - Failed to execute{} proxyConnection test query ({}).", poolName, (isUseJdbc4Validation ? " isValid() for proxyConnection, configure" : ""), e.getMessage());
         throw e;
      }
   }

   /**
    * Check the default transaction isolation of the Connection.
    *
    * @param connection a Connection to check
    * @throws SQLException rethrown from the driver
    */
   private void checkDefaultIsolation(final Connection connection) throws SQLException {
      try {
         defaultTransactionIsolation = connection.getTransactionIsolation();
         if (transactionIsolationLevel == -1) {
            transactionIsolationLevel = defaultTransactionIsolation;
         }
      } catch (SQLException e) {
         log.warn("{} - Default transaction isolation level detection failed ({}).", poolName, e.getMessage());
         if (e.getSQLState() != null && !e.getSQLState().startsWith("08")) {
            throw e;
         }
      }
   }

   /**
    * Set the query timeout, if it is supported by the driver.
    *
    * @param statement  a proxyStatement to set the query timeout on
    * @param timeoutSec the number of seconds before timeout
    */
   private void setQueryTimeout(final Statement statement, final int timeoutSec) {
      if (isQueryTimeoutSupported != FALSE) {
         try {
            statement.setQueryTimeout(timeoutSec);
            isQueryTimeoutSupported = TRUE;
         } catch (Exception e) {
            if (isQueryTimeoutSupported == UNINITIALIZED) {
               isQueryTimeoutSupported = FALSE;
               log.info("{} - Failed to set query timeout for proxyStatement. ({})", poolName, e.getMessage());
            }
         }
      }
   }

   /**
    * Set the network timeout, if <code>isUseNetworkTimeout</code> is <code>true</code> and the
    * driver supports it.  Return the pre-existing value of the network timeout.
    *
    * @param connection the proxyConnection to set the network timeout on
    * @param timeoutMs  the number of milliseconds before timeout
    * @return the pre-existing network timeout value
    */
   private int getAndSetNetworkTimeout(final Connection connection, final long timeoutMs) {
      if (isNetworkTimeoutSupported != FALSE) {
         try {
            final int originalTimeout = connection.getNetworkTimeout();
            connection.setNetworkTimeout(netTimeoutExecutor, (int) timeoutMs);
            isNetworkTimeoutSupported = TRUE;
            return originalTimeout;
         } catch (Exception | AbstractMethodError e) {
            if (isNetworkTimeoutSupported == UNINITIALIZED) {
               isNetworkTimeoutSupported = FALSE;

               log.info("{} - Driver does not support get/set network timeout for connections. ({})", poolName, e.getMessage());
               if (validationTimeout < SECONDS.toMillis(1)) {
                  log.warn("{} - A validationTimeout of less than 1 second cannot be honored on drivers without setNetworkTimeout() support.", poolName);
               } else if (validationTimeout % SECONDS.toMillis(1) != 0) {
                  log.warn("{} - A validationTimeout with fractional second granularity cannot be honored on drivers without setNetworkTimeout() support.", poolName);
               }
            }
         }
      }

      return 0;
   }

   /**
    * Set the network timeout, if <code>isUseNetworkTimeout</code> is <code>true</code> and the
    * driver supports it.
    *
    * @param connection the proxyConnection to set the network timeout on
    * @param timeoutMs  the number of milliseconds before timeout
    * @throws SQLException throw if the proxyConnection.setNetworkTimeout() call throws
    */
   private void setNetworkTimeout(final Connection connection, final long timeoutMs) throws SQLException {
      if (isNetworkTimeoutSupported == TRUE) {
         connection.setNetworkTimeout(netTimeoutExecutor, (int) timeoutMs);
      }
   }

   /**
    * Execute the user-specified init SQL.
    *
    * @param connection the proxyConnection to initialize
    * @param sql        the SQL to execute
    * @param isCommit   whether to commit the SQL after execution or not
    * @throws SQLException throws if the init SQL execution fails
    */
   private void executeSql(final Connection connection, final String sql, final boolean isCommit) throws SQLException {
      if (sql != null) {
         try (Statement statement = connection.createStatement()) {
            // proxyConnection was created a few milliseconds before, so set query timeout is omitted (we assume it will succeed)
            statement.execute(sql);
         }

         if (isIsolateInternalQueries && !isAutoCommit) {
            if (isCommit) {
               connection.commit();
            } else {
               connection.rollback();
            }
         }
      }
   }

   private void createNetworkTimeoutExecutor(final DataSource dataSource, final String dsClassName, final String jdbcUrl) {
      // Temporary hack for MySQL issue: http://bugs.mysql.com/bug.php?id=75615
      if ((dsClassName != null && dsClassName.contains("Mysql")) ||
         (jdbcUrl != null && jdbcUrl.contains("mysql")) ||
         (dataSource != null && dataSource.getClass().getName().contains("Mysql"))) {
         netTimeoutExecutor = new SynchronousExecutor();
      } else {
         ThreadFactory threadFactory = configuration.getThreadFactory();
         threadFactory = threadFactory != null ? threadFactory : new DefaultThreadFactory(poolName + " network timeout executor", true);
         ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(threadFactory);
         executor.setKeepAliveTime(15, SECONDS);
         executor.allowCoreThreadTimeOut(true);
         netTimeoutExecutor = executor;
      }
   }

   /**
    * This will create a string for debug logging. Given a set of "reset bits", this
    * method will return a concatenated string, for example:
    * <p>
    * Input : 0b00110
    * Output: "autoCommit, isolation"
    *
    * @param resetDirtyStatus a set of "reset bits"
    * @return a string of which states were reset
    */
   private String stringFromResetBits(final int resetDirtyStatus) {
      final StringBuilder sb = new StringBuilder();
      for (int index = 0; index < RESET_STATES.length; index++) {
         if ((resetDirtyStatus & (0b1 << index)) != 0) {
            sb.append(RESET_STATES[index]).append(", ");
         }
      }

      sb.setLength(sb.length() - 2);  // trim trailing comma
      return sb.toString();
   }


}
